package com.demo.sync;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.Charset;
import java.util.List;
import java.util.Set;

public class PullConsumerManualBalance {
    public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException {
        DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("sync_producer_group_test1");
        defaultMQPullConsumer.setNamesrvAddr("localhost:9876");
        defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
        defaultMQPullConsumer.start();
        while(!Thread.currentThread().isInterrupted()){
            Set<MessageQueue> messageQueueSet = defaultMQPullConsumer.fetchMessageQueuesInBalance("sync_TopicTest");
            if(messageQueueSet == null || messageQueueSet.size() <= 0){
                try {
                    Thread.currentThread().sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
                continue;
            }
            for(MessageQueue messageQueue : messageQueueSet){
                Long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false);
                PullResult pullResult = null;
                try {
                    pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", offset, 1);
                }  catch (InterruptedException e) {
                    e.printStackTrace();
                }
                PullStatus statue = pullResult.getPullStatus();
                switch (statue){
                    case FOUND:
                        List<MessageExt> messages = pullResult.getMsgFoundList();
                        for (int i = 0; i < messages.size(); i++) {
                            MessageClientExt messageClientExt = (MessageClientExt) messages.get(i);
                            System.out.printf(Thread.currentThread().getName() + " Receive New Messages: "
                                    + new String(messageClientExt.getBody(), Charset.forName("UTF-8")) + "%n");
                        }
                    case NO_NEW_MSG:
                        ;
                    default:
                        ;
                }
                defaultMQPullConsumer.updateConsumeOffset(messageQueue,pullResult.getNextBeginOffset());
            }
            try {
                Thread.currentThread().sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                e.printStackTrace();
            }
        }
    }
}

/*
fetchMessageQueuesInBalance总是不能得到分配的queue
 */